-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] The POC of supporting Flink in Gluten #8839
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
@shuai-xu, thanks for your great work! Could you draft a design doc? Google doc is preferred. |
*/ | ||
package org.apache.gluten.backendsapi; | ||
|
||
public class FlinkBackend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth emphasizing that Flink will not be a backend in Gluten. It's more considered a frontend or a framework or so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, We can discuss it.
Can we use gluten-substrait module? Looks like there exists too much duplicated code. |
Thank you for the PR! It eventually starts |
Is there a design for this support for people who are not very familiar with Flink? |
This is an initial PR and require more people to join with us for reviewing the design. |
Yes, we need to discuss whether it need to translate to substrait plan or just call velox jni interface, if do need to translate to substrait plan, It need to reconstruct the gluten-substraint module a little to share it between gluten spark and flink. |
@PHILO-HE @majetideepak OK, I will write a design soon. |
Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing? I am opening a GitHub side discussion here: #8849 |
Yes, done to add the author. |
} | ||
} | ||
|
||
private native long nativeProcessElement(int executor, long data); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Very glad to see such exciting progress on Flink support. |
|
new BoundSplit( | ||
"5", | ||
-1, | ||
new ExternalStreamConnectorSplit("escs1", es.id()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would use connector-external-stream
as connector ID.
It is currently a fixed value in Velox4j; https://github.com/velox4j/velox4j/blob/434ae37dfc3d5fb79788fe5bce41e41dd17901b5/src/main/cpp/main/velox4j/init/Init.cc#L94-L97
ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator)); | ||
List<BoundSplit> splits = List.of( | ||
new BoundSplit( | ||
"5", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The planNodeId
is the same with leaf scan node's node ID so Velox
knows we bind this split to the scan. Perhaps we can pass the scan node ID into GlutenCalOperator
somehow?
return new ConstantTypedExpr( | ||
toType(literal.getType()), | ||
toVariant(literal), | ||
null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use ConstantTypedExpr.create(toVariant(literal))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create will call native method, I think we'd better not call native in client side, so we need not load native libraries in Flink client side.
// add a mock input as velox not allow the source is empty. | ||
PlanNode mockInput = new ValuesNode( | ||
String.valueOf(ExecNodeContext.newNodeId()), | ||
"", | ||
false, | ||
1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a scan node so we can bind the split to it. E.g.,
final TableScanNode scanNode = new TableScanNode(
"id-1",
...(type),
new ExternalStreamTableHandle("connector-external-stream"),
List.of()
);
case VARCHAR: | ||
return new VarCharValue(literal.getValue().toString()); | ||
case BINARY: | ||
return new VarBinaryValue(literal.getValue().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use VarBinaryValue.create()
to pass a byte array in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
att
After try to run this POC, got error |
You can try the latest code. |
Thanks for reply. I rerun with last commit and gotcha Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "sources" (class io.github.zhztheplayer.velox4j.plan.TableScanNode), not marked as ignorable (4 known properties: "id", "tableHandle", "outputType", "assignments"])
at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.github.zhztheplayer.velox4j.plan.TableScanNode["sources"]) I looked at TableScanNode and notice that json getter exist on "sources" but this field is absence. I try rebuild velox4j with Serde change (add disable for jackson DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) and got Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: couldn't find key name in dynamic object
at io.github.zhztheplayer.velox4j.jni.JniWrapper.executeQuery(Native Method)
at io.github.zhztheplayer.velox4j.jni.JniApi.executeQuery(JniApi.java:52)
at io.github.zhztheplayer.velox4j.query.Queries.execute(Queries.java:14)
at org.apache.gluten.table.runtime.operators.GlutenCalOperator.processElement(GlutenCalOperator.java:97) SQL query CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10; |
What changes were proposed in this pull request?
This pr is the Java side of POC to support Flink. It generates a GlutenCalOperator to run filter using native.
The draft design is here.